[Hadoop] 分布式处理框架MapReduce的join操作实战

Map Join的操作、Reduce Join的操作

Posted by 李玉坤 on 2017-09-02

Map Join的操作

map类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.kun.map_join;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.HashMap;

public class LeftOutJoinMapper extends Mapper {
private HashMap<String,String> city_info=new HashMap();
private Text outPutKey = new Text();
private Text outPutValue = new Text();
private String mapInputStr = null;
private String mapInputSpit[]=null;
private String city_secondPart = null;

/**
* 此方法在每个task开始之前执行,这里主要用作从DistributedCache
* 中取到A文件,并将里边记录取出放到内存中。
*/
@Override
protected void setup(Context context) throws IOException {
BufferedReader br =null;
String cityInfo = null;
URI[] cacheFiles = context.getCacheFiles();
String path = cacheFiles[0].getPath();

br=new BufferedReader(new FileReader(path));
System.out.println("====");
while (null!=(cityInfo=br.readLine())){
String[] cityPart =cityInfo.split("\\|");
if(cityPart.length==5){
city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);
}
}


}

/**
* Map端的实现相当简单,直接判断B中的
* cityID是否存在我的map中就ok了,这样就可以实现Map Join了
*/
@Override
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException {
FileSplit fileSplit=(FileSplit)context.getInputSplit();
String pathName=fileSplit.getPath().getName();
if(pathName.endsWith("B.txt")) {

if (value == null || value.toString().equals("")) {
return;
}

mapInputStr = value.toString();
mapInputSpit = mapInputStr.split("\\|");
//过滤非法记录
if (mapInputSpit.length != 4) {
return;
}
//判断链接字段是否在map中存在
city_secondPart = city_info.get(mapInputSpit[3]);
if (city_secondPart != null) {
this.outPutKey.set(mapInputSpit[3]);
this.outPutValue.set(city_secondPart + "\t" + mapInputSpit[0] + "\t" + mapInputSpit[1] + "\t" + mapInputSpit[2]);
context.write(outPutKey, outPutValue);
}
}
}
}

driver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.kun.map_join;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class LeftOutJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
System.setProperty("hadoop.home.dir", "D:\\software\\master\\winutils-master\\hadoop-2.6.0");

Configuration configuration = new Configuration();

// 判断文件系统是否存在,如果存在就删除
FileSystem fileSystem = FileSystem.get(configuration);
Path outputPath = new Path("output");
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}


Job job = Job.getInstance(configuration);
job.setNumReduceTasks(0);
job.addCacheFile(new URI("file:///D:/other/testjoin/A.txt"));


FileInputFormat.setInputPaths(job, new Path("D:\\other\\testjoin")); //设置map输入文件路径
FileOutputFormat.setOutputPath(job,new Path("output"));//设置reduce输出文件路径

job.setJarByClass(LeftOutJoinDriver.class);
job.setMapperClass(LeftOutJoinMapper.class);


job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
//设置map的输出key和value类型
job.setMapOutputKeyClass(Text.class);

//设置reduce的输出key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.waitForCompletion(true);
}
}

Reduce Join的操作

这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。
依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<hadoop.version>2.6.0-cdh5.7.0</hadoop.version>

<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>

<!--添加Hadoop的依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

自定义CombineValues类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.kun.mapjion;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CombineValues implements WritableComparable<CombineValues> {

private Text joinKey;//链接关键字
private Text flag;//文件来源标志
private Text secondPart; //除了链接键外的其他部分

public Text getJoinKey() {
return joinKey;
}

public void setJoinKey(Text joinKey) {
this.joinKey = joinKey;
}

public Text getFlag() {
return flag;
}

public void setFlag(Text flag) {
this.flag = flag;
}

public Text getSecondPart() {
return secondPart;
}

public void setSecondPart(Text secondPart) {
this.secondPart = secondPart;
}

public CombineValues() {
this.joinKey = new Text();
this.flag = new Text();
this.secondPart = new Text();
}




@Override
public int compareTo(CombineValues o) {
return this.joinKey.compareTo(o.getJoinKey());
}

@Override
public void write(DataOutput dataOutput) throws IOException {
this.joinKey.write(dataOutput);
this.flag.write(dataOutput);
this.secondPart.write(dataOutput);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.joinKey.readFields(dataInput);
this.flag.readFields(dataInput);
this.secondPart.readFields(dataInput);
}

@Override
public String toString() {
return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
}

}

Mapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.kun.mapjion;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class LeftOutJoinMapper extends Mapper<Object,Text, Text,CombineValues> {
private CombineValues combineValues =new CombineValues();
private Text flag = new Text();
private Text joinkey = new Text();
private Text secondPart = new Text();

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//获得文件输入路径
FileSplit fileSplit=(FileSplit)context.getInputSplit();
String pathName=fileSplit.getPath().getName();
//数据来自于A文件,标记为0
if(pathName.endsWith("A.txt")) {
String[] splits = value.toString().split("\\|");
//过滤格式错误的记录
if (splits.length != 5) {
return;
}
flag.set("0");
joinkey.set(splits[0]);
secondPart.set(splits[1] + "\t" + splits[2] + "\t" + splits[3] + "\t" + splits[4]);

combineValues.setFlag(flag);
combineValues.setJoinKey(joinkey);
combineValues.setSecondPart(secondPart);
context.write(combineValues.getJoinKey(), combineValues);

}else if(pathName.endsWith("B.txt")){//数据来自于B,标记为1
String[] splits = value.toString().split("\\|");
//过滤格式错误的记录
if (splits.length != 4) {
return;
}
flag.set("1");
joinkey.set(splits[3]);
secondPart.set(splits[0] + "\t" + splits[1] + "\t" + splits[2]);

combineValues.setFlag(flag);
combineValues.setJoinKey(joinkey);
combineValues.setSecondPart(secondPart);
context.write(combineValues.getJoinKey(), combineValues);
}
}
}

Reduce类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.kun.mapjion;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;

public class LeftOutJoinReducer extends Reducer<Text,CombineValues,Text,Text> {
private static final Logger logger = LoggerFactory.getLogger(LeftOutJoinReducer.class);

//存储一个分组中的左表信息
private ArrayList<Text> leftTable = new ArrayList<Text>();
//存储一个分组中的右表信息
private ArrayList<Text> rightTable = new ArrayList<Text>();
private Text secondPar = null;
private Text output = new Text();



/**
* 一个分组调用一次reduce函数
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<CombineValues> values, Context context) throws IOException, InterruptedException {
leftTable.clear();
rightTable.clear();

/**
* 将分组中的元素按照文件分别进行存放
* 这种方法要注意的问题:
* 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,
* 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最
* 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。
*/

for(CombineValues cv:values){
secondPar = new Text(cv.getSecondPart().toString());
//左表
if("0".equals(cv.getFlag().toString().trim())){
leftTable.add(secondPar);
}
//右表
else if("1".equals(cv.getFlag().toString().trim())){
rightTable.add(secondPar);
}
}

System.out.println("A:"+leftTable.toString());
System.out.println("B:"+rightTable.toString());


for(Text leftPart : leftTable){
for(Text rightPart : rightTable){
output.set(leftPart+ "\t" + rightPart);
context.write(key, output);
}
}


}
}

Driver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.kun.mapjion;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


import java.io.IOException;

public class LeftOutJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

System.setProperty("hadoop.home.dir", "D:\\hadoop2.6_Win_x64-master");

Configuration configuration = new Configuration();

// 判断文件系统是否存在,如果存在就删除
FileSystem fileSystem = FileSystem.get(configuration);
Path outputPath = new Path("output");
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}

Job job = Job.getInstance(configuration);
job.setJarByClass(LeftOutJoinDriver.class);

FileInputFormat.setInputPaths(job, new Path("D:/Documents/test2/")); //设置map输入文件路径
FileOutputFormat.setOutputPath(job,new Path("output"));//设置reduce输出文件路径

job.setMapperClass(LeftOutJoinMapper.class);
job.setReducerClass(LeftOutJoinReducer.class);

job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
//设置map的输出key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CombineValues.class);

//设置reduce的输出key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.waitForCompletion(true);
}

}

结果

1
2
3
4
5
1   长春 1  901    1  3  3G 555
1 长春 1 901 1 1 2G 123
2 吉林 2 902 1 2 3G 333
3 四平 3 903 1 4 2G 777
4 松原 4 904 1 5 3G 666

文件A.txt
id name orderid city_code is_show
0|其他|9999|9999|0
1|长春|1|901|1
2|吉林|2|902|1
3|四平|3|903|1
4|松原|4|904|1
5|通化|5|905|1
6|辽源|6|906|1
7|白城|7|907|1
8|白山|8|908|1
9|延吉|9|909|1

文件B.txt
userID network flow cityID
1|2G|123|1
2|3G|333|2
3|3G|555|1
4|2G|777|3
5|3G|666|4